跳到主要内容

SpringCloud Bus 消息总线

参考资料 Spring Cloud Bus 消息总线官方文档 参考资料 Spring Cloud入门-Bus消息总线(Hoxton版本)

什么是消息总线

参考资料 Spring Cloud Bus 消息总线官方文档

消息代理中间件构建一个共用的消息主题让所有微服务实例订阅,当该消息主题产生消息时会被所有微服务实例监听和消费。

消息代理又是什么?

消息代理是一个消息验证、传输、路由的架构模式,主要用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它在微服务之间起到通信调度作用,减少了服务之间的依赖。

通常会使用消息代理来构建一个主题,然后把微服务架构中的所有服务都连接到这个主题上去,当我们向该主题发送消息时,所有订阅该主题的服务都会收到消息并进行消费。

使用 Spring Cloud Bus 可以方便地构建起这套机制,所以 Spring Cloud Bus 又被称为消息总线。

Spring Cloud Bus 配合 Spring Cloud Config 使用可以实现配置的动态刷新。目前 Spring Cloud Bus 支持两种消息代理:RabbitMQ 和 Kafka。

什么时候使用 Spring Cloud Bus

微服务一般都采用集群方式部署,而且在高并发下经常需要对服务进行扩容、缩容、上线、下线的操作。比如我们需要更新配置,又或者需要同时失效所有服务器上的某个缓存,需要向所有相关的服务器发送命令,此时就可以选择使用 Spring Cloud Bus 了。

总的来说,就是在我们 需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用 Spring Cloud Bus 了。

例如下图通过 Spring Cloud Bus 实现微服务架构的配置刷新。

配置刷新的原理

有两种方式

1、利用消息总线触发一个客户端 /bus/refresh,而刷新所有客户端的配置

2、利用消息总线触发一个服务端 ConfigServer 的 /bus/refresh 端点,而刷新所有客户端的配置

但是一般都使用第二种方式,因为第一种打破了微服务的职责单一性,微服务本身是业务模块,它本不应该承担配置刷新的职责。而且它有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改

配置 RabbitMQ 环境

具体的搭建方式看 RabbitMQ 那篇笔记

进入控制台

ConfigServer 配置

在 ConfigServer 加上以下依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

<!-- 一定要添加这个,因为就是通过它暴露端点的 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>

修改配置文件,主要是添加了 RabbitMQ 的配置及暴露了刷新配置的 Actuator 端点;

server:
port: 3344

spring:
application:
name: "config-center"
cloud:
config:
server:
git:
uri: https://gitee.com/alsritter/temp-config.git
# 如果配置文件丢到了子目录,可以添加搜索目录(默认情况下,仅搜索根目录)
search-paths:
- cloud-config
basedir:
# 读取分支
label: master

# rabbit相关配置
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin

eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka # 注册进 eureka

# 凡是暴露监控、刷新的都要有 actuator 依赖,bus-refresh 就是 actuator 的刷新操作
# rabbitmq 相关配置,暴露 bus 刷新配置的端点
management:
endpoints:
web:
exposure:
include: "*" #'bus-refresh' # 暴露 bus 刷新配置的端点

ConfigClient 配置

客户端也加上这个配置

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

<!-- 客户端也需要暴露端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

修改客户端的 bootstrap.yml 文件

server:
port: 3366

spring:
application:
name: config-client
cloud:
# Config 客户端配置
config:
label: master # 分支名称
name: config # 配置文件名称
profile: dev # 读取后缀名称 上述 3个综合:master分支上config-dev.yml的配置文件被读取 http://localhost:3344/master/config-dev.yml
uri: http://localhost:3344 #配置中心地址
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
# 服务注册到 eureka 地址
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka

# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*" #'refresh'

编写一个 Controller,用来读取配置文件

@RestController
@RefreshScope
public class ConfigClientController {
@Value("${server.port}")
private String serverPort;
@Value(("${config.info}"))
private String configInfo;

@GetMapping("/configInfo")
public String getConfigInfo() {
return "serverPort:" + serverPort + "\t\n" +
"configInfo: " + configInfo;
}
}

测试配置的更新

先启动服务,检查 RabbitMQ 发现已经自动创建了一个队列

这时访问一下服务端能否读取到最新的文件

http://localhost:3344/master/config-dev.yml

这时使用 POST 请求访问这个配置中心

curl -X POST http://localhost:3344/actuator/bus-refresh

这时访问客户端,发现它的配置也被修改了

配置 WebHook

但是还需要手动去请求这个地址还是太麻烦了,所以下面使用远程库的钩子

这里使用远程仓库提供的 WebHook,WebHook 是一个 Web 自定义回调函数,当程序发生警报行为时,会自动回调调用指定的 URL。WebHook 的回调 URL 可以是第三方应用,也可以是 WebHook 内应用(就是一个钩子,在数据来的时候自动调用发送 Web 消息给指定的 URL)

执行流程如下

image.png

在 Gitee 就能设置(管理 > WebHooks)